home *** CD-ROM | disk | FTP | other *** search
/ Aminet 16 / Aminet 16 (1996)(GTI - Schatztruhe)[!][Dec 1996].iso / Aminet / comm / term / term_source.lha / Extras / Source / term-source.lha / MsgQueue.c < prev    next >
C/C++ Source or Header  |  1996-10-20  |  7KB  |  383 lines

  1. /*
  2. **    MsgQueue.c
  3. **
  4. **    Special MsgPort like queue management
  5. **
  6. **    Copyright © 1990-1996 by Olaf `Olsen' Barthel
  7. **        All Rights Reserved
  8. **
  9. **    :ts=4
  10. */
  11.  
  12. #ifndef _GLOBAL_H
  13. #include "Global.h"
  14. #endif
  15.  
  16.     /* DestroyPooled(struct MsgItem *Item):
  17.      *
  18.      *    Default destructor for CreateMsgItem().
  19.      */
  20.  
  21. STATIC VOID
  22. DestroyPooled(struct MsgItem *Item)
  23. {
  24.     FreeVecPooled(Item);
  25. }
  26.  
  27.     /* GetMsgItem(struct MsgQueue *Queue):
  28.      *
  29.      *    Get the next message from the queue handle,
  30.      *    similar to GetMsg().
  31.      */
  32.  
  33. APTR
  34. GetMsgItem(struct MsgQueue *Queue)
  35. {
  36.         /* Valid handle? */
  37.  
  38.     if(Queue)
  39.     {
  40.         APTR Item;
  41.  
  42.             /* Gain access */
  43.  
  44.         ObtainSemaphore(&Queue->Access);
  45.  
  46.             /* Any item available? */
  47.  
  48.         if(!IsListEmpty((struct List *)&Queue->MsgList))
  49.         {
  50.                 /* Remove the first item */
  51.  
  52.             Remove((struct Node *)(Item = Queue->MsgList.mlh_Head));
  53.  
  54.                 /* Are there any tasks waiting for the queue to become smaller? */
  55.  
  56.             if(!IsListEmpty((struct List *)&Queue->WaitList))
  57.             {
  58.                 struct SemaphoreRequest *WaitRequest = (struct SemaphoreRequest *)Queue->WaitList.mlh_Head;
  59.  
  60.                     /* Remove one waiting task from the list */
  61.  
  62.                 Remove((struct Node *)WaitRequest);
  63.  
  64.                     /* Wake the task up */
  65.  
  66.                 Signal(WaitRequest->sr_Waiter,SIGF_SINGLE);
  67.             }
  68.  
  69.                 /* One message taken */
  70.  
  71.             if(Queue->QueueSize)
  72.                 Queue->QueueSize--;
  73.         }
  74.         else
  75.             Item = NULL;
  76.  
  77.             /* Drop the semaphore */
  78.  
  79.         ReleaseSemaphore(&Queue->Access);
  80.  
  81.             /* Return the item read */
  82.  
  83.         return(Item);
  84.     }
  85.     else
  86.         return(NULL);
  87. }
  88.  
  89.     /* PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item):
  90.      *
  91.      *    Add a message item to a queue handle, similar to
  92.      *    PutMsg().
  93.      */
  94.  
  95. VOID
  96. PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item)
  97. {
  98.         /* Valid data? */
  99.  
  100.     if(Queue && Item)
  101.     {
  102.             /* Gain access to the handle */
  103.  
  104.         ObtainSemaphore(&Queue->Access);
  105.  
  106.             /* Are we to discard this message? */
  107.  
  108.         if(Queue->Discard)
  109.         {
  110.             ReleaseSemaphore(&Queue->Access);
  111.  
  112.             DeleteMsgItem(Item);
  113.         }
  114.         else
  115.         {
  116.                 /* Are we to watch for a maximum queue size? */
  117.  
  118.             if(Queue->MaxSize)
  119.             {
  120.                     /* Maximum queue size already reached? */
  121.  
  122.                 if(Queue->MaxSize == Queue->QueueSize)
  123.                 {
  124.                     struct SemaphoreRequest WaitRequest;
  125.  
  126.                         /* That's me */
  127.  
  128.                     WaitRequest.sr_Waiter = FindTask(NULL);
  129.  
  130.                         /* Add this task to the waiting list */
  131.  
  132.                     AddTail((struct List *)&Queue->WaitList,(struct Node *)&WaitRequest);
  133.  
  134.                         /* Careful, please */
  135.  
  136.                     Forbid();
  137.  
  138.                         /* Drop the semaphore */
  139.  
  140.                     ReleaseSemaphore(&Queue->Access);
  141.  
  142.                         /* Clear the one-shot flag */
  143.  
  144.                     ClrSignal(SIGF_SINGLE);
  145.  
  146.                         /* Wait for the queue to become smaller */
  147.  
  148.                     Wait(SIGF_SINGLE);
  149.  
  150.                         /* Gain access to the handle */
  151.  
  152.                     ObtainSemaphore(&Queue->Access);
  153.  
  154.                         /* Reenable multitasking */
  155.  
  156.                     Permit();
  157.                 }
  158.  
  159.                     /* We are going to make the queue longer */
  160.  
  161.                 Queue->QueueSize++;
  162.             }
  163.  
  164.                 /* Add the item to the list */
  165.  
  166.             AddTail((struct List *)&Queue->MsgList,(struct Node *)Item);
  167.  
  168.                 /* Wake up the owner */
  169.  
  170.             Signal(Queue->SigTask,Queue->SigMask);
  171.  
  172.                 /* Drop the semaphore */
  173.  
  174.             ReleaseSemaphore(&Queue->Access);
  175.         }
  176.     }
  177. }
  178.  
  179.     /* DeleteMsgItem(struct MsgItem *Item):
  180.      *
  181.      *    Clean up after a message item.
  182.      */
  183.  
  184. VOID
  185. DeleteMsgItem(struct MsgItem *Item)
  186. {
  187.         /* Valid data? */
  188.  
  189.     if(Item)
  190.     {
  191.             /* Do we have a destructor for this item? */
  192.  
  193.         if(Item->Destructor)
  194.             (*Item->Destructor)(Item);
  195.     }
  196. }
  197.  
  198.     /* CreateMsgItem(LONG Size):
  199.      *
  200.      *    Allocate a new message item.
  201.      */
  202.  
  203. struct MsgItem *
  204. CreateMsgItem(LONG Size)
  205. {
  206.     struct MsgItem    *Item;
  207.  
  208.         /* Allocate space for the item and add the default destructor */
  209.  
  210.     if(Item = (struct MsgItem *)AllocVecPooled(Size,MEMF_ANY))
  211.         Item->Destructor = DestroyPooled;
  212.  
  213.     return(Item);
  214. }
  215.  
  216.     /* UnlockMsgQueue(struct MsgQueue *Queue):
  217.      *
  218.      *    Wake up all tasks waiting for the queue to become
  219.      *    smaller. Note: this will change the maximum queue
  220.      *    size.
  221.      */
  222.  
  223. VOID
  224. UnlockMsgQueue(struct MsgQueue *Queue)
  225. {
  226.         /* Valid data? */
  227.  
  228.     if(Queue)
  229.     {
  230.         struct SemaphoreRequest *WaitRequest;
  231.  
  232.             /* Gain access to the handle */
  233.  
  234.         ObtainSemaphore(&Queue->Access);
  235.  
  236.             /* Notify all waiting tasks */
  237.  
  238.         for(WaitRequest = (struct SemaphoreRequest *)Queue->WaitList.mlh_Head ; WaitRequest->sr_Link.mln_Succ ; WaitRequest = (struct SemaphoreRequest *)WaitRequest->sr_Link.mln_Succ)
  239.             Signal(WaitRequest->sr_Waiter,SIGF_SINGLE);
  240.  
  241.             /* Reset the maximum queue size */
  242.  
  243.         Queue->MaxSize = 0;
  244.  
  245.             /* Drop the semaphore */
  246.  
  247.         ReleaseSemaphore(&Queue->Access);
  248.     }
  249. }
  250.  
  251.     /* DeleteMsgQueue(struct MsgQueue *Queue):
  252.      *
  253.      *    Free a queue handle.
  254.      */
  255.  
  256. VOID
  257. DeleteMsgQueue(struct MsgQueue *Queue)
  258. {
  259.         /* Valid data? */
  260.  
  261.     if(Queue)
  262.     {
  263.         struct MsgItem *Item,*Next;
  264.  
  265.             /* Make sure no tasks is waiting in the list */
  266.  
  267.         UnlockMsgQueue(Queue);
  268.  
  269.             /* Disable multitasking */
  270.  
  271.         Forbid();
  272.  
  273.             /* Clear the queue signal */
  274.  
  275.         ClrSignal(Queue->SigMask);
  276.  
  277.             /* Wait until all tasks are finished */
  278.  
  279.         while(Queue->QueueSize)
  280.             Wait(Queue->SigMask);
  281.  
  282.             /* Reenable multitasking */
  283.  
  284.         Permit();
  285.  
  286.             /* Remove each item from the list */
  287.  
  288.         for(Item = (struct MsgItem *)Queue->MsgList.mlh_Head ; Next = (struct MsgItem *)Item->MinNode.mln_Succ ; Item = Next)
  289.             DeleteMsgItem(Item);
  290.  
  291.             /* Free the signal bit if necessary */
  292.  
  293.         if(Queue->SigBit != -1)
  294.             FreeSignal(Queue->SigBit);
  295.  
  296.             /* Free the handle */
  297.  
  298.         FreeVecPooled(Queue);
  299.     }
  300. }
  301.  
  302.     /* CreateMsgQueue(ULONG SigMask,LONG MaxSize):
  303.      *
  304.      *    Allocate a queue handle.
  305.      */
  306.  
  307. struct MsgQueue *
  308. CreateMsgQueue(ULONG SigMask,LONG MaxSize)
  309. {
  310.     struct MsgQueue    *Queue;
  311.  
  312.         /* Allocate the queue handle */
  313.  
  314.     if(Queue = (struct MsgQueue *)AllocVecPooled(sizeof(struct MsgQueue),MEMF_ANY | MEMF_CLEAR))
  315.     {
  316.             /* Initialize the access semaphore */
  317.  
  318.         InitSemaphore(&Queue->Access);
  319.  
  320.             /* Initialize the message item list */
  321.  
  322.         NewList((struct List *)&Queue->MsgList);
  323.  
  324.             /* Reset the size data */
  325.  
  326.         Queue->QueueSize    = 0;
  327.         Queue->MaxSize        = MaxSize;
  328.  
  329.             /* Initialize the queue wait list */
  330.  
  331.         NewList((struct List *)&Queue->WaitList);
  332.  
  333.             /* Store the owner address */
  334.  
  335.         Queue->SigTask = FindTask(NULL);
  336.  
  337.             /* Do we have a signal mask ready? */
  338.  
  339.         if(SigMask)
  340.         {
  341.                 /* Use it */
  342.  
  343.             Queue->SigMask    = SigMask;
  344.             Queue->SigBit    = -1;
  345.         }
  346.         else
  347.         {
  348.                 /* Allocate a new signal */
  349.  
  350.             if((Queue->SigBit = AllocSignal(-1)) == -1)
  351.             {
  352.                 FreeVecPooled(Queue);
  353.  
  354.                 return(NULL);
  355.             }
  356.             else
  357.                 Queue->SigMask = (1L << Queue->SigBit);
  358.         }
  359.     }
  360.  
  361.     return(Queue);
  362. }
  363.  
  364.     /* SetQueueDiscard(struct MsgQueue *Queue,BOOLEAN Mode):
  365.      *
  366.      *    Set whether new items should be added to the
  367.      *    queue or whether they should rather get
  368.      *    discarded instead.
  369.      */
  370.  
  371. VOID
  372. SetQueueDiscard(struct MsgQueue *Queue,BOOL Mode)
  373. {
  374.     if(Queue)
  375.     {
  376.         ObtainSemaphore(&Queue->Access);
  377.  
  378.         Queue->Discard = Mode;
  379.  
  380.         ReleaseSemaphore(&Queue->Access);
  381.     }
  382. }
  383.